import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

//import java.nio.file.Path;

/**
 * @Description    KafkaMessageStreaming Flink入口类，封装了对于Kafka消息的处理逻辑。本例每1秒统计一次结果并写入到本地件或者打印出来
 * @Author         0262000099 Hengtai Nie
 * @CreateDate     2018/9/21 16:51
 */
public class KafkaMessageStreaming
{

    public static void main(String[] args) throws Exception
    {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 非常关键，一定要设置启动检查点！！
        env.enableCheckpointing(5000);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "Desktop:9091,Laptop:9092,Laptop:9093");
//        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "from-beginning");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.setProperty("group.id", "appleyuchi");


        System.out.println("---------------------监控1------------------------------");

        //    args[0] = "test-0921";  //传入的是kafka中的topic
        FlinkKafkaConsumer011<String> consumer =
                new FlinkKafkaConsumer011<>("test-0921", new SimpleStringSchema(), props);


        consumer.setStartFromEarliest();
        consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());
        System.out.println(consumer);



        System.out.println("---------------------监控2------------------------------");

        DataStream<Tuple2<String, Long>>
                keyedStream = env
                .addSource(consumer).flatMap(new MessageSplitter()).keyBy(0)
                .timeWindow(Time.seconds(2))
                .apply(new WindowFunction2());
        keyedStream.print();


        System.out.println("---------------------监控3------------------------------");
        StreamingFileSink<Tuple2<String, Long>> string = StreamingFileSink.forRowFormat(new Path("/home/appleyuchi/bigdata/flink_sink"), new SimpleStringEncoder<Tuple2<String, Long>>("utf-8")).build();
        keyedStream.addSink(string);

//注意，输出结果在/home/appleyuchi/bigdata/flink_sink的隐藏文件中



        System.out.println("---------------------监控4------------------------------");

        env.execute("Kafka-Flink Test");
    }
}